package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

public class QueueMessageContainerManager extends MessageContainerManagerSupport
{
  private static final Log log = LogFactory.getLog(QueueMessageContainerManager.class);
  private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
  private PersistenceAdapter persistenceAdapter;
  protected SubscriptionContainer subscriptionContainer;
  protected FilterFactory filterFactory;
  protected Map activeSubscriptions = new ConcurrentHashMap();
  protected Map browsers = new ConcurrentHashMap();
  private Object subscriptionMutex = new Object();

  public QueueMessageContainerManager(PersistenceAdapter persistenceAdapter) {
    this(persistenceAdapter, new SubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
  }

  public QueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
    super(dispatcher);
    this.persistenceAdapter = persistenceAdapter;
    this.subscriptionContainer = subscriptionContainer;
    this.filterFactory = filterFactory;
  }

  public void addMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (log.isDebugEnabled()) {
      log.debug("Adding consumer: " + info);
    }
    if (info.getDestination().isQueue()) {
      Subscription sub = this.subscriptionContainer.makeSubscription(this.dispatcher, info, createFilter(info));
      this.dispatcher.addActiveSubscription(client, sub);
      updateActiveSubscriptions(sub);

      sub.setActive(true);
    }
  }

  public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (log.isDebugEnabled()) {
      log.debug("Removing consumer: " + info);
    }
    if ((info.getDestination() != null) && (info.getDestination().isQueue()))
    {
      Subscription sub;
      Iterator iter;
      synchronized (this.subscriptionMutex) {
        sub = this.subscriptionContainer.removeSubscription(info.getConsumerId());
        if (sub != null) {
          sub.setActive(false);
          sub.clear();
          this.dispatcher.removeActiveSubscription(client, sub);

          for (iter = this.messageContainers.values().iterator(); iter.hasNext(); ) {
            QueueMessageContainer container = (QueueMessageContainer)iter.next();

            if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
              QueueList list = getSubscriptionList(container);
              list.remove(sub);
              if (list.isEmpty()) {
                this.activeSubscriptions.remove(sub.getDestination().getPhysicalName());
              }
              list = getBrowserList(container);
              list.remove(sub);
              if (list.isEmpty())
                this.browsers.remove(sub.getDestination().getPhysicalName());
            }
          }
        }
      }
    }
  }

  public void deleteSubscription(String clientId, String subscriberName)
    throws JMSException
  {
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    ActiveMQDestination dest = (ActiveMQDestination)message.getJMSDestination();
    if ((dest != null) && (dest.isQueue())) {
      if (log.isDebugEnabled()) {
        log.debug("Dispaching message: " + message);
      }
      QueueMessageContainer container = (QueueMessageContainer)getContainer(((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());

      container.addMessage(message);
      this.dispatcher.wakeup();
    }
  }

  public void acknowledgeMessage(BrokerClient client, MessageAck ack)
    throws JMSException
  {
    Subscription sub = this.subscriptionContainer.getSubscription(ack.getConsumerId());
    if (sub != null)
      sub.messageConsumed(ack);
  }

  public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException
  {
    Subscription sub = this.subscriptionContainer.getSubscription(ack.getConsumerId());
    if (sub != null)
      sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
  }

  public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException
  {
    Subscription sub = this.subscriptionContainer.getSubscription(ack.getConsumerId());
    if (sub != null)
      sub.redeliverMessage(null, ack);
  }

  public void poll()
    throws JMSException
  {
    Iterator iter;
    synchronized (this.subscriptionMutex) {
      for (iter = this.activeSubscriptions.keySet().iterator(); iter.hasNext(); ) {
        QueueMessageContainer container = (QueueMessageContainer)iter.next();

        QueueList browserList = (QueueList)this.browsers.get(container);
        doPeek(container, browserList);
        QueueList list = (QueueList)this.activeSubscriptions.get(container);
        doPoll(container, list);
      }
    }
  }

  public void commitTransaction(BrokerClient client, String transactionId) {
  }

  public void rollbackTransaction(BrokerClient client, String transactionId) {
  }

  public MessageContainer getContainer(String destinationName) throws JMSException {
    QueueMessageContainer container = null;
    Iterator iter;
    synchronized (this.subscriptionMutex) {
      container = (QueueMessageContainer)this.messageContainers.get(destinationName);
      if (container == null) {
        container = this.persistenceAdapter.createQueueMessageContainer(destinationName);
        container.start();
        this.messageContainers.put(destinationName, container);

        for (iter = this.subscriptionContainer.subscriptionIterator(); iter.hasNext(); ) {
          Subscription sub = (Subscription)iter.next();
          if (sub.isBrowser()) {
            updateBrowsers(container, sub);
          }
          else {
            updateActiveSubscriptions(container, sub);
          }
        }
      }
    }
    return container;
  }

  private void doPeek(QueueMessageContainer container, QueueList browsers)
    throws JMSException
  {
    if ((browsers != null) && (browsers.size() > 0))
      for (int i = 0; i < browsers.size(); i++) {
        SubscriptionImpl sub = (SubscriptionImpl)browsers.get(i);
        int count = 0;
        ActiveMQMessage msg = null;
        do {
          msg = container.peekNext(sub.getLastMessageIdentity());
          if (msg != null) {
            if (sub.isTarget(msg)) {
              sub.addMessage(container, msg);
              this.dispatcher.wakeup(sub);
            }
            else {
              sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
            }
          }
        }
        while ((msg != null) && (!sub.isAtPrefetchLimit()) && (count++ < 50));
      }
  }

  private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException
  {
    int count = 0;
    ActiveMQMessage msg = null;
    if ((subList != null) && (subList.size() > 0))
      do {
        boolean dispatched = false;
        msg = container.poll();
        if (msg != null) {
          QueueListEntry entry = subList.getFirstEntry();
          boolean targeted = false;
          while (entry != null) {
            SubscriptionImpl sub = (SubscriptionImpl)entry.getElement();
            if (sub.isTarget(msg)) {
              targeted = true;
              if (!sub.isAtPrefetchLimit()) {
                sub.addMessage(container, msg);
                dispatched = true;
                this.dispatcher.wakeup(sub);
                subList.rotate();
                break;
              }
            }
            entry = subList.getNextEntry(entry);
          }
          if (!dispatched) {
            if (!targeted) {
              break;
            }
            container.returnMessage(msg.getJMSMessageIdentity()); break;
          }
        }

      }

      while ((msg != null) && (count++ < 50));
  }

  private void updateActiveSubscriptions(Subscription subscription)
    throws JMSException
  {
    synchronized (this.subscriptionMutex) {
      boolean processedSubscriptionContainer = false;

      String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
      for (Iterator iter = this.messageContainers.entrySet().iterator(); iter.hasNext(); ) {
        Map.Entry entry = (Map.Entry)iter.next();
        String destinationName = (String)entry.getKey();
        QueueMessageContainer container = (QueueMessageContainer)entry.getValue();

        if (destinationName.equals(subscriptionPhysicalName)) {
          processedSubscriptionContainer = true;
        }
        processSubscription(subscription, container);
      }
      if (!processedSubscriptionContainer)
        processSubscription(subscription, (QueueMessageContainer)getContainer(subscriptionPhysicalName));
    }
  }

  protected void processSubscription(Subscription subscription, QueueMessageContainer container)
    throws JMSException
  {
    if (subscription.isBrowser()) {
      updateBrowsers(container, subscription);
    }
    else
      updateActiveSubscriptions(container, subscription);
  }

  private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub)
    throws JMSException
  {
    if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
      container.reset();
      QueueList list = getSubscriptionList(container);
      if (!list.contains(sub))
        list.add(sub);
    }
  }

  private QueueList getSubscriptionList(QueueMessageContainer container)
  {
    QueueList list = (QueueList)this.activeSubscriptions.get(container);
    if (list == null) {
      list = new DefaultQueueList();
      this.activeSubscriptions.put(container, list);
    }
    return list;
  }

  private void updateBrowsers(QueueMessageContainer container, Subscription sub)
    throws JMSException
  {
    if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
      container.reset();
      QueueList list = getBrowserList(container);
      if (!list.contains(sub))
        list.add(sub);
    }
  }

  private QueueList getBrowserList(QueueMessageContainer container)
  {
    QueueList list = (QueueList)this.browsers.get(container);
    if (list == null) {
      list = new DefaultQueueList();
      this.browsers.put(container, list);
    }
    return list;
  }

  protected Filter createFilter(ConsumerInfo info)
    throws JMSException
  {
    Filter filter = this.filterFactory.createFilter(info.getDestination(), info.getSelector());
    if (info.isNoLocal()) {
      filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
    }
    return filter;
  }
}